/*
 * CounterDataLogger.java
 *
 * Tigase Jabber/XMPP Server
 * Copyright (C) 2004-2017 "Tigase, Inc." <office@tigase.com>
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, version 3 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. Look for COPYING file in the top folder.
 * If not, see http://www.gnu.org/licenses/.
 */

package tigase.stats;

import tigase.db.DBInitException;
import tigase.db.DataRepository;
import tigase.db.RepositoryFactory;
import tigase.kernel.beans.Initializable;
import tigase.kernel.beans.config.ConfigField;
import tigase.kernel.beans.config.ConfigurationChangedAware;
import tigase.util.dns.DNSResolverFactory;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Created: Apr 20, 2010 6:39:05 PM
 *
 * @author <a href="mailto:artur.hefczyc@tigase.org">Artur Hefczyc</a>
 */
public class CounterDataLogger
		implements StatisticsArchivizerIfc, ConfigurationChangedAware, Initializable {

	/**
	 * SQL INT UNSIGNED
	 */
	public static final String BOSH_CONNS_COL = "bosh_conns";

	/**
	 * SQL INT UNSIGNED
	 */
	public static final String C2S_CONNS_COL = "c2s_conns";

	/**
	 * SQL BIGINT UNSIGNED
	 */
	public static final String C2S_PACKETS_COL = "c2s_packets";

	/**
	 * SQL float UNSIGNED
	 */
	public static final String CPU_USAGE_COL = "cpu_usage";

	public static final String DB_URL_PROP_KEY = "db-url";

	/**
	 * SQL BIGINT UNSIGNED
	 */
	public static final String EXT_PACKETS_COL = "ext_packets";

	/**
	 * SQL VARCHAR(2049)
	 */
	public static final String HOSTNAME_COL = "hostname";

	/**
	 * SQL BIGINT UNSIGNED
	 */
	public static final String IQS_COL = "iqs";

	/**
	 * SQL float UNSIGNED
	 */
	public static final String MEM_USAGE_COL = "mem_usage";

	/**
	 * SQL BIGINT UNSIGNED
	 */
	public static final String MESSAGES_COL = "messages";

	/**
	 * SQL BIGINT UNSIGNED
	 */
	public static final String MUC_PACKETS_COL = "muc_packets";

	/**
	 * SQL BIGINT UNSIGNED
	 */
	public static final String PRESENCES_COL = "presences";

	/**
	 * SQL BIGINT UNSIGNED
	 */
	public static final String PUBSUB_PACKETS_COL = "pubsub_packets";

	/**
	 * SQL INT UNSIGNED
	 */
	public static final String S2S_CONNS_COL = "s2s_conns";

	/**
	 * SQL BIGINT UNSIGNED
	 */
	public static final String S2S_PACKETS_COL = "s2s_packets";

	/**
	 * SQL BIGINT UNSIGNED
	 */
	public static final String SM_PACKETS_COL = "sm_packets";

	public static final String STATS_TABLE = "tig_stats_log";

	/**
	 * SQL BIGINT
	 */
	public static final String UPTIME_COL = "uptime";

	/**
	 * SQL INT UNSIGNED
	 */
	public static final String VHOSTS_COL = "vhosts";
	/**
	 * SQL BIGINT UNSIGNED
	 */
	public static final String REGISTERED_COL = "registered";
	private static final Logger log = Logger.getLogger(CounterDataLogger.class.getName());
	private static final String CREATE_STATS_TABLE =
			"create table " + STATS_TABLE + " ( " + " lid serial," + " ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP," + " " +
					HOSTNAME_COL + " varchar(2049) NOT NULL," + " " + CPU_USAGE_COL +
					" double precision not null default 0," + " " + MEM_USAGE_COL +
					" double precision not null default 0," + " " + UPTIME_COL + " bigint not null default 0," + " " +
					VHOSTS_COL + " int  not null default 0," + " " + SM_PACKETS_COL + " bigint  not null default 0," +
					" " + MUC_PACKETS_COL + " bigint  not null default 0," + " " + PUBSUB_PACKETS_COL +
					" bigint  not null default 0," + " " + C2S_PACKETS_COL + " bigint  not null default 0," + " " +
					S2S_PACKETS_COL + " bigint  not null default 0," + " " + EXT_PACKETS_COL +
					" bigint  not null default 0," + " " + PRESENCES_COL + " bigint  not null default 0," + " " +
					MESSAGES_COL + " bigint  not null default 0," + " " + IQS_COL + " bigint  not null default 0," +
					" " + REGISTERED_COL + " bigint  not null default 0," + " " + C2S_CONNS_COL +
					" int  not null default 0," + " " + S2S_CONNS_COL + " int  not null default 0," + " " +
					BOSH_CONNS_COL + " int  not null default 0," + " primary key(ts," + HOSTNAME_COL + "(255)))";
	private static final String DERBY_CREATE_STATS_TABLE =
			"create table " + STATS_TABLE + " ( " + " lid bigint generated by default as identity not null," +
					" ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP," + " " + HOSTNAME_COL + " varchar(2049) NOT NULL," + " " +
					CPU_USAGE_COL + " double precision not null default 0," + " " + MEM_USAGE_COL +
					" double precision not null default 0," + " " + UPTIME_COL + " bigint not null default 0," + " " +
					VHOSTS_COL + " int  not null default 0," + " " + SM_PACKETS_COL + " bigint  not null default 0," +
					" " + MUC_PACKETS_COL + " bigint  not null default 0," + " " + PUBSUB_PACKETS_COL +
					" bigint  not null default 0," + " " + C2S_PACKETS_COL + " bigint  not null default 0," + " " +
					S2S_PACKETS_COL + " bigint  not null default 0," + " " + EXT_PACKETS_COL +
					" bigint  not null default 0," + " " + PRESENCES_COL + " bigint  not null default 0," + " " +
					MESSAGES_COL + " bigint  not null default 0," + " " + IQS_COL + " bigint  not null default 0," +
					" " + REGISTERED_COL + " bigint  not null default 0," + " " + C2S_CONNS_COL +
					" int  not null default 0," + " " + S2S_CONNS_COL + " int  not null default 0," + " " +
					BOSH_CONNS_COL + " int  not null default 0," + " primary key(ts," + HOSTNAME_COL + "))";
	private static final String SQLSERVER_CREATE_STATS_TABLE =
			"create table " + STATS_TABLE + " ( " + " lid [bigint] IDENTITY(1,1)," +
					" ts [datetime] DEFAULT  getdate()," + " " + HOSTNAME_COL + " [nvarchar](2049) NOT NULL," + " " +
					CPU_USAGE_COL + " double precision not null default 0," + " " + MEM_USAGE_COL +
					" double precision not null default 0," + " " + UPTIME_COL + " bigint not null default 0," + " " +
					VHOSTS_COL + " int  not null default 0," + " " + SM_PACKETS_COL + " bigint  not null default 0," +
					" " + MUC_PACKETS_COL + " bigint  not null default 0," + " " + PUBSUB_PACKETS_COL +
					" bigint  not null default 0," + " " + C2S_PACKETS_COL + " bigint  not null default 0," + " " +
					S2S_PACKETS_COL + " bigint  not null default 0," + " " + EXT_PACKETS_COL +
					" bigint  not null default 0," + " " + PRESENCES_COL + " bigint  not null default 0," + " " +
					MESSAGES_COL + " bigint  not null default 0," + " " + IQS_COL + " bigint  not null default 0," +
					" " + REGISTERED_COL + " bigint  not null default 0," + " " + C2S_CONNS_COL +
					" int  not null default 0," + " " + S2S_CONNS_COL + " int  not null default 0," + " " +
					BOSH_CONNS_COL + " int  not null default 0," + " primary key(ts," + HOSTNAME_COL + "))";
	private static final String STATS_INSERT_QUERY =
			"insert into " + STATS_TABLE + "(" + CPU_USAGE_COL + ", " + MEM_USAGE_COL + ", " + UPTIME_COL + ", " +
					VHOSTS_COL + ", " + SM_PACKETS_COL + ", " + MUC_PACKETS_COL + ", " + PUBSUB_PACKETS_COL + ", " +
					C2S_PACKETS_COL + ", " + S2S_PACKETS_COL + ", " + EXT_PACKETS_COL + ", " + PRESENCES_COL + ", " +
					MESSAGES_COL + ", " + IQS_COL + ", " + REGISTERED_COL + ", " + C2S_CONNS_COL + ", " +
					S2S_CONNS_COL + ", " + BOSH_CONNS_COL + ", " + HOSTNAME_COL +
					") values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
	protected static String defaultHostname;

	private DataRepository data_repo = null;
	@ConfigField(desc = "Database URL", alias = DB_URL_PROP_KEY)
	private String databaseUrl = null;
	@ConfigField(desc = "Frequency")
	private long frequency = -1;
	private long last_c2s_packets = 0;
	private long last_ext_packets = 0;
	private long last_iqs = 0;
	private long last_messages = 0;
	private long last_muc_packets = 0;
	private long last_presences = 0;
	private long last_pubsub_packets = 0;
	private long last_s2s_packets = 0;
	private long last_sm_packets = 0;

	public void addStatsLogEntry(float cpu_usage, float mem_usage, long uptime, int vhosts, long sm_packets,
								 long muc_packets, long pubsub_packets, long c2s_packets, long s2s_packets,
								 long ext_packets, long presences, long messages, long iqs, long registered,
								 int c2s_conns, int s2s_conns, int bosh_conns) {
		try {
			PreparedStatement insert_stats = data_repo.getPreparedStatement(null, STATS_INSERT_QUERY);

			synchronized (insert_stats) {
				insert_stats.setFloat(1, ((cpu_usage >= 0f) ? cpu_usage : 0f));
				insert_stats.setFloat(2, mem_usage);
				insert_stats.setLong(3, uptime);
				insert_stats.setInt(4, vhosts);
				insert_stats.setLong(5, sm_packets);
				insert_stats.setLong(6, muc_packets);
				insert_stats.setLong(7, pubsub_packets);
				insert_stats.setLong(8, c2s_packets);
				insert_stats.setLong(9, s2s_packets);
				insert_stats.setLong(10, ext_packets);
				insert_stats.setLong(11, presences);
				insert_stats.setLong(12, messages);
				insert_stats.setLong(13, iqs);
				insert_stats.setLong(14, registered);
				insert_stats.setInt(15, c2s_conns);
				insert_stats.setInt(16, s2s_conns);
				insert_stats.setInt(17, bosh_conns);
				insert_stats.setString(18, defaultHostname);
				insert_stats.executeUpdate();
			}
		} catch (SQLException e) {
			log.log(Level.WARNING, "Problem adding new entry to DB: ", e);
		}
	}

	@Override
	public void execute(StatisticsProvider sp) {
		long c2s_packets = sp.getCompPackets("c2s");
		long ext_packets = sp.getCompPackets("ext");
		long iqs = sp.getCompIqs("sess-man");
		long messages = sp.getCompMessages("sess-man");
		long muc_packets = sp.getCompPackets("muc");
		long presences = sp.getCompPresences("sess-man");
		long pubsub_packets = sp.getCompPackets("pubsub");
		long s2s_packets = sp.getCompPackets("s2s");
		long sm_packets = sp.getSMPacketsNumber();

		addStatsLogEntry(sp.getCPUUsage(), sp.getHeapMemUsage(), sp.getUptime(),
						 sp.getStats("vhost-man", "Number of VHosts", 0), sm_packets - last_sm_packets,
						 muc_packets - last_muc_packets, pubsub_packets - last_pubsub_packets,
						 c2s_packets - last_c2s_packets, s2s_packets - last_s2s_packets, ext_packets - last_ext_packets,
						 presences - last_presences, messages - last_messages, iqs - last_iqs, sp.getRegistered(),
						 sp.getCompConnections("c2s"), sp.getCompConnections("s2s"), sp.getCompConnections("bosh"));
		last_c2s_packets = c2s_packets;
		last_ext_packets = ext_packets;
		last_iqs = iqs;
		last_messages = messages;
		last_muc_packets = muc_packets;
		last_presences = presences;
		last_pubsub_packets = pubsub_packets;
		last_s2s_packets = s2s_packets;
		last_sm_packets = sm_packets;
	}

	@Override
	public long getFrequency() {
		return frequency;
	}

	/**
	 * Method initialize repository
	 *
	 * @param conn_str database connection URI
	 * @param map map of the additional parameters
	 */
	public void initRepository(String conn_str, Map<String, String> map)
			throws SQLException, ClassNotFoundException, IllegalAccessException, InstantiationException,
				   DBInitException {
		log.log(Level.INFO, "Initializing dbAccess for db connection url: {0}", conn_str);
		data_repo = RepositoryFactory.getDataRepository(null, conn_str, map);

		// Check if DB is correctly setup and contains all required tables.
		checkDB();
		data_repo.initPreparedStatement(STATS_INSERT_QUERY, STATS_INSERT_QUERY);
	}

	@Override
	public void release() {
	}

	@Override
	public void initialize() {
		beanConfigurationChanged(Collections.emptyList());
	}

	@Override
	public void beanConfigurationChanged(Collection<String> changedFields) {
		defaultHostname = DNSResolverFactory.getInstance().getDefaultHost();
		try {
			initRepository(databaseUrl, null);
		} catch (Exception ex) {
			log.log(Level.SEVERE, "Cannot initialize connection to database: ", ex);
		}
	}

	private void checkDB() throws SQLException {
		DataRepository.dbTypes databaseType = data_repo.getDatabaseType();

		switch (databaseType) {
			case derby:
				data_repo.checkTable(STATS_TABLE, DERBY_CREATE_STATS_TABLE);

				break;

			case jtds:
			case sqlserver:
				data_repo.checkTable(STATS_TABLE, SQLSERVER_CREATE_STATS_TABLE);

				break;

			case postgresql:
			case mysql:
			default:
				data_repo.checkTable(STATS_TABLE, CREATE_STATS_TABLE);

				break;
		}
	}
}
